Skip to content

feat: real-time inference progress events for web channel#514

Merged
senamakel merged 4 commits intotinyhumansai:mainfrom
senamakel:feat/streaming
Apr 12, 2026
Merged

feat: real-time inference progress events for web channel#514
senamakel merged 4 commits intotinyhumansai:mainfrom
senamakel:feat/streaming

Conversation

@senamakel
Copy link
Copy Markdown
Member

@senamakel senamakel commented Apr 12, 2026

Summary

  • Adds real-time progress events during the agent turn loop so the web channel (and app UI) can show live status updates instead of retroactive tool-call batches
  • Introduces AgentProgress enum with events: TurnStarted, IterationStarted, ToolCallStarted, ToolCallCompleted, SubagentSpawned, SubagentCompleted, SubagentFailed, TurnCompleted
  • Replaces the retroactive publish_tool_events_from_history() with a real-time spawn_progress_bridge() that maps progress events to WebChannelEvents over socket.io
  • Frontend now shows a live inference status indicator (thinking/tool use/sub-agent phases) and receives tool timeline updates in real-time

Changes

Rust core

  • New: src/openhuman/agent/progress.rsAgentProgress event enum
  • Modified: Agent struct gains on_progress: Option<mpsc::Sender<AgentProgress>> field + set_on_progress() setter
  • Modified: turn.rs — emits progress events at turn start, each iteration, tool call start/complete, and turn end
  • Modified: web.rs — creates per-request progress channel, spawns bridge task that maps AgentProgressWebChannelEvent, removed retroactive publish_tool_events_from_history()

New socket events

Event When
inference_start Agent turn begins
iteration_start Each LLM call iteration starts
tool_call Tool about to execute (now real-time)
tool_result Tool completed (now real-time)
subagent_spawned Sub-agent dispatched
subagent_completed / subagent_failed Sub-agent finished

Frontend

  • Modified: chatService.ts — new event types and subscribeChatEvents listeners for all new events
  • Modified: Conversations.tsxInferenceStatus state tracking, live pulsing indicator showing current phase, sub-agents in tool timeline

Test plan

  • Send a message via web channel and verify inference_start fires immediately
  • Verify tool_call events appear in real-time as tools execute (not batched after completion)
  • Verify tool_result events update the timeline status as each tool finishes
  • Verify the inference status indicator shows "Thinking..." during LLM calls
  • Verify the indicator switches to "Running <tool_name>..." during tool execution
  • Verify inference status clears on chat_done and chat_error
  • Verify cancellation still works correctly
  • cargo check passes
  • tsc --noEmit passes

Summary by CodeRabbit

  • New Features

    • Real-time inference status indicator showing per-turn reasoning, iteration progress, active tool, and sub-agent lifecycle above the tool timeline
    • Live progress stream with detailed tool execution metrics (tool name, iteration, elapsed time, output size)
  • Improvements

    • Replaced retroactive tool reporting with immediate event streaming for instant feedback
    • Per-session enable/disable of progress streaming for controlled telemetry
  • Tests

    • Updated SSE test helper to read and assert specific event types from the event stream

- Added new event listeners for inference start, iteration start, subagent spawning, and completion to track the live state of chat interactions.
- Introduced an `InferenceStatus` interface to manage the current phase and active tools/subagents for each thread.
- Updated the UI to display inference status indicators, enhancing user experience during chat interactions.
- Created a new `progress` module in the Rust backend to emit real-time progress events, allowing for better integration with the web channel.
- Refactored the `subscribeChatEvents` function to include new event handlers for managing inference and subagent events, improving clarity and maintainability of the event handling logic.
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 12, 2026

📝 Walkthrough

Walkthrough

Adds real-time agent progress streaming: Rust Agent emits typed AgentProgress events during turns/iterations/tool calls/subagents; a bridge publishes these as websocket chat events; frontend subscribes to new inference/iteration/subagent events and renders per-thread inference status and tool-timeline updates.

Changes

Cohort / File(s) Summary
Frontend Chat Event Infrastructure
app/src/services/chatService.ts
Added new exported event interfaces and optional lifecycle callbacks (onInferenceStart, onIterationStart, onSubagentSpawned, onSubagentDone); subscribeChatEvents registers new socket handlers and includes cleanup.
Frontend Inference Status UI
app/src/pages/Conversations.tsx
Added inferenceStatusByThread state and selectedInferenceStatus; implemented lifecycle callbacks to track inference phases (thinking, tool_use, subagent), iteration index, sub-agent timeline entries, activeTool, and cleanup on done/error; rendered inference status indicator above tool timeline.
Rust Agent Progress Types
src/openhuman/agent/progress.rs, src/openhuman/agent/mod.rs
New public AgentProgress enum with turn/iteration/tool/subagent variants; exported via pub mod progress;.
Rust Agent Session Types & API
src/openhuman/agent/harness/session/types.rs, src/openhuman/agent/harness/session/builder.rs, src/openhuman/agent/harness/session/runtime.rs
Added on_progress: Option<mpsc::Sender<AgentProgress>> to Agent; builder initializes it to None; added Agent::set_on_progress(...) to attach/detach the sender.
Rust Agent Execution Instrumentation
src/openhuman/agent/harness/session/turn.rs
Emit AgentProgress events (TurnStarted, IterationStarted, ToolCallStarted/ToolCallCompleted, Subagent*, TurnCompleted); added emit_progress helper; updated execute_tool_call/execute_tools signatures to include iteration.
Real-Time Progress Bridge & Web Channel
src/openhuman/channels/providers/web.rs
Removed retroactive history-based tool event publishing; create mpsc progress channel in run_chat_task, register with agent, spawn spawn_progress_bridge that maps AgentProgress → websocket events (inference_start, iteration_start, tool_call, tool_result, subagent_spawned, subagent_completed, subagent_failed, turn_completed); clear progress sender after run.
Tests / SSE Helpers
tests/json_rpc_e2e.rs
Added SSE helper read_sse_event_by_type to read targeted SSE events from stream; updated test to wait for chat_done event.

Sequence Diagram

sequenceDiagram
    participant Agent as Agent Turn Loop
    participant ProgressTx as Progress<br/>Channel (Tx)
    participant Bridge as Progress Bridge
    participant WebEvent as Web Event Publish
    participant Socket as WebSocket
    participant Service as Chat Service
    participant UI as Conversations UI

    rect rgba(100,150,200,0.5)
        Note over Agent,UI: Real-Time progress streaming
    end

    Agent->>ProgressTx: emit_progress(TurnStarted / IterationStarted / ToolCallStarted)
    ProgressTx->>Bridge: deliver AgentProgress
    Bridge->>WebEvent: map to websocket event (inference_start / iteration_start / tool_call)
    WebEvent->>Socket: emit WebChannelEvent
    Socket->>Service: subscribeChatEvents handler
    Service->>UI: onInferenceStart / onIterationStart / onToolCall callbacks
    UI->>UI: update inferenceStatusByThread, phase, timeline

    Agent->>ProgressTx: emit_progress(SubagentSpawned / ToolCallCompleted / TurnCompleted)
    ProgressTx->>Bridge: deliver AgentProgress
    Bridge->>WebEvent: map to subagent/tool_result/turn_completed
    WebEvent->>Socket: emit WebChannelEvent
    Socket->>Service: handler
    Service->>UI: onSubagentSpawned / onToolResult / onDone
    UI->>UI: update timeline entries / clear onDone
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested reviewers

  • graycyrus

Poem

🐇 I hop on bytes with ears upright,
emitting progress through the night.
Iterations drum, subagents prance—
timelines glow, the tools advance.
A little rabbit cheers, eyes bright! 🥕✨

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main feature: adding real-time inference progress events specifically for the web channel, which is the core of this PR.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/openhuman/channels/providers/web.rs (1)

489-500: ⚠️ Potential issue | 🟡 Minor

Remove the unused parse_tool_args function and its test.

The function is only called in tests and has no production usage. With the removal of history-based tool event publishing, this function is now dead code.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/channels/providers/web.rs` around lines 489 - 500, Remove the
dead helper by deleting the parse_tool_args function (fn
parse_tool_args(arguments: &str) -> Value) from
src/openhuman/channels/providers/web.rs and also remove the associated unit
test(s) that only reference this helper; search for references to
parse_tool_args in the repo and ensure any test file (or #[cfg(test)] block)
that solely exists to exercise this function is removed or refactored to no
longer call it, then run cargo test to confirm no remaining references.
app/src/services/chatService.ts (1)

143-186: ⚠️ Potential issue | 🟠 Major

Add trace logs for the new progress-event handlers.

Line 143–Line 186 introduces the new real-time inference/subagent flow, but there are no namespaced debug checkpoints when payloads are received and forwarded. Please add debug logs (with stable prefix + correlation fields like thread_id, request_id, round, event, success) in these callbacks so sequencing/cancellation issues can be traced end-to-end.

As per coding guidelines, "Add substantial, development-oriented logs on new/changed flows in TypeScript/React app code; use namespaced debug logs and dev-only detail as needed" and "Use grep-friendly log prefixes ([feature], domain name, or JSON-RPC method) in app code for correlation with sidecar and Tauri output".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@app/src/services/chatService.ts` around lines 143 - 186, Add namespaced debug
logs inside each progress-event callback (the callbacks registered for
listeners.onInferenceStart, onIterationStart, onToolCall, onToolResult,
onSubagentSpawned, and the onSubagentDone handlers) so every received payload is
logged before forwarding; include grep-friendly prefix (e.g.
"[realtime][chat]"), and correlation fields thread_id, request_id, round, event
(use EVENTS.* symbol name) and success (boolean) in the log entry. Ensure logs
are emitted only in dev/debug builds if needed (wrap with the app's dev-check)
and keep the logging placement in the existing cb/onCompleted/onFailed functions
so sequencing/cancellation can be traced end-to-end.
🧹 Nitpick comments (1)
src/openhuman/channels/providers/web.rs (1)

396-406: Consider using serde_json::json! for the output payload.

The manual JSON string formatting is fragile and could break with special characters in future fields. Using the macro would be safer and more consistent with the rest of the codebase.

♻️ Suggested improvement
-                        output: Some(format!(
-                            "{{\"output_chars\":{output_chars},\"elapsed_ms\":{elapsed_ms}}}"
-                        )),
+                        output: Some(
+                            serde_json::json!({
+                                "output_chars": output_chars,
+                                "elapsed_ms": elapsed_ms
+                            })
+                            .to_string(),
+                        ),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/channels/providers/web.rs` around lines 396 - 406, The output
field currently builds JSON via string formatting (output: Some(format!(...)))
which is fragile; replace that with serde_json::json! to construct a proper JSON
value (e.g., json!({"output_chars": output_chars, "elapsed_ms": elapsed_ms}))
and then serialize it to a String (to_string() or serde_json::to_string())
before assigning to the output field; update the import to use serde_json::json
if not already present and ensure this change is applied where the struct is
constructed (the block setting skill_id, args, output, success, round, etc.).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@app/src/services/chatService.ts`:
- Around line 143-186: Add namespaced debug logs inside each progress-event
callback (the callbacks registered for listeners.onInferenceStart,
onIterationStart, onToolCall, onToolResult, onSubagentSpawned, and the
onSubagentDone handlers) so every received payload is logged before forwarding;
include grep-friendly prefix (e.g. "[realtime][chat]"), and correlation fields
thread_id, request_id, round, event (use EVENTS.* symbol name) and success
(boolean) in the log entry. Ensure logs are emitted only in dev/debug builds if
needed (wrap with the app's dev-check) and keep the logging placement in the
existing cb/onCompleted/onFailed functions so sequencing/cancellation can be
traced end-to-end.

In `@src/openhuman/channels/providers/web.rs`:
- Around line 489-500: Remove the dead helper by deleting the parse_tool_args
function (fn parse_tool_args(arguments: &str) -> Value) from
src/openhuman/channels/providers/web.rs and also remove the associated unit
test(s) that only reference this helper; search for references to
parse_tool_args in the repo and ensure any test file (or #[cfg(test)] block)
that solely exists to exercise this function is removed or refactored to no
longer call it, then run cargo test to confirm no remaining references.

---

Nitpick comments:
In `@src/openhuman/channels/providers/web.rs`:
- Around line 396-406: The output field currently builds JSON via string
formatting (output: Some(format!(...))) which is fragile; replace that with
serde_json::json! to construct a proper JSON value (e.g., json!({"output_chars":
output_chars, "elapsed_ms": elapsed_ms})) and then serialize it to a String
(to_string() or serde_json::to_string()) before assigning to the output field;
update the import to use serde_json::json if not already present and ensure this
change is applied where the struct is constructed (the block setting skill_id,
args, output, success, round, etc.).

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 139802f4-e56c-4359-ba25-6afadb8f12d9

📥 Commits

Reviewing files that changed from the base of the PR and between ace0006 and 4061bca.

📒 Files selected for processing (9)
  • app/src/pages/Conversations.tsx
  • app/src/services/chatService.ts
  • src/openhuman/agent/harness/session/builder.rs
  • src/openhuman/agent/harness/session/runtime.rs
  • src/openhuman/agent/harness/session/turn.rs
  • src/openhuman/agent/harness/session/types.rs
  • src/openhuman/agent/mod.rs
  • src/openhuman/agent/progress.rs
  • src/openhuman/channels/providers/web.rs

- Resolve merge conflict in agent/mod.rs (keep both pformat and progress)
- Add namespaced debug logs ([realtime][chat]) to all progress event
  callbacks in chatService.ts with correlation fields
- Remove dead parse_tool_args helper and its unit test
- Replace fragile string-formatted JSON with serde_json::json! in
  ToolCallCompleted output field
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/openhuman/agent/harness/session/turn.rs`:
- Around line 754-758: The emit_progress method currently discards errors from
tx.try_send(event) which can hide Full or Closed failures; update emit_progress
(and the on_progress send path) to capture the Result from tx.try_send and log
failures including the error and the event context (use the existing logger or
processLogger), distinguishing Closed vs Full so Closed logs at warning/error
and Full logs at debug/info; optionally consider backing off and retrying
critical events (e.g., tool completion) or switching to a bounded buffer with
drop policy, but minimally ensure every try_send error is logged with error
details and the AgentProgress identifier in emit_progress.

In `@src/openhuman/channels/providers/web.rs`:
- Around line 264-274: The progress bridge currently drains buffered progress_rx
after the inference task is cancelled, causing stale events to be published for
a cancelled request; modify the code that creates and runs the bridge so it can
be canceled or validated: have spawn_progress_bridge return a JoinHandle and
store it alongside the main inference task (or return a cancellable future) so
cancel_chat can abort the bridge handle, or add a request validation check
inside the bridge’s publishing loop that compares the captured request_id
against the current in-flight request before emitting; update
agent.set_on_progress usage to ensure progress_tx is closed/removed when the
bridge is aborted to avoid draining leftover events.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 0dd0cc30-bae8-4139-bc54-f2027bb651fb

📥 Commits

Reviewing files that changed from the base of the PR and between 4061bca and 82dcc16.

📒 Files selected for processing (6)
  • app/src/services/chatService.ts
  • src/openhuman/agent/harness/session/builder.rs
  • src/openhuman/agent/harness/session/turn.rs
  • src/openhuman/agent/harness/session/types.rs
  • src/openhuman/agent/mod.rs
  • src/openhuman/channels/providers/web.rs
✅ Files skipped from review due to trivial changes (2)
  • src/openhuman/agent/harness/session/builder.rs
  • src/openhuman/agent/mod.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/openhuman/agent/harness/session/types.rs
  • app/src/services/chatService.ts

Comment on lines +754 to +758
/// Emit a progress event (fire-and-forget) if the sender is set.
fn emit_progress(&self, event: AgentProgress) {
if let Some(ref tx) = self.on_progress {
let _ = tx.try_send(event);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

rg -n -C2 'mpsc::channel\(64\)|fn emit_progress|try_send|AgentProgress::' \
  src/openhuman/agent/harness/session/turn.rs \
  src/openhuman/channels/providers/web.rs

Repository: tinyhumansai/openhuman

Length of output: 6749


Add diagnostics when progress event delivery fails.

The emit_progress method silently drops events on channel Full or Closed errors via let _ = tx.try_send(event). This violates the logging guideline for error handling paths on external calls and means progress state transitions (tool calls, iterations, completion) can be lost without any trace, leaving the web UI out of sync. Add logging for send failures at minimum; consider a bounded queue or retry logic for critical events like tool completion.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/agent/harness/session/turn.rs` around lines 754 - 758, The
emit_progress method currently discards errors from tx.try_send(event) which can
hide Full or Closed failures; update emit_progress (and the on_progress send
path) to capture the Result from tx.try_send and log failures including the
error and the event context (use the existing logger or processLogger),
distinguishing Closed vs Full so Closed logs at warning/error and Full logs at
debug/info; optionally consider backing off and retrying critical events (e.g.,
tool completion) or switching to a bounded buffer with drop policy, but
minimally ensure every try_send error is logged with error details and the
AgentProgress identifier in emit_progress.

Comment on lines +264 to +274
// Wire up a real-time progress channel so tool calls, iterations,
// and sub-agent events are emitted to the web channel as they happen
// (instead of retroactively after the loop finishes).
let (progress_tx, progress_rx) = tokio::sync::mpsc::channel(64);
agent.set_on_progress(Some(progress_tx));
spawn_progress_bridge(
progress_rx,
client_id.to_string(),
thread_id.to_string(),
request_id.to_string(),
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

rg -n -C3 'struct InFlightEntry|spawn_progress_bridge|handle\.abort\(|in_flight\.insert|remove\(&map_key\)' \
  src/openhuman/channels/providers/web.rs

Repository: tinyhumansai/openhuman

Length of output: 1921


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Read the spawn_progress_bridge function and surrounding context
sed -n '300,350p' src/openhuman/channels/providers/web.rs

Repository: tinyhumansai/openhuman

Length of output: 2214


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Read more of spawn_progress_bridge to see full event handling
sed -n '300,450p' src/openhuman/channels/providers/web.rs

Repository: tinyhumansai/openhuman

Length of output: 6690


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Continue reading to find the end of spawn_progress_bridge function
sed -n '450,480p' src/openhuman/channels/providers/web.rs

Repository: tinyhumansai/openhuman

Length of output: 1363


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Get the end of the function
sed -n '475,495p' src/openhuman/channels/providers/web.rs

Repository: tinyhumansai/openhuman

Length of output: 745


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Find and read the publish_web_channel_event function
rg -n 'fn publish_web_channel_event' src/openhuman/channels/providers/web.rs -A 30

Repository: tinyhumansai/openhuman

Length of output: 952


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Verify the cancellation flow: when progress_tx is dropped and when events would still flush
# Check run_chat_task to see when progress_tx is set and when it gets dropped
sed -n '240,290p' src/openhuman/channels/providers/web.rs

Repository: tinyhumansai/openhuman

Length of output: 1653


🏁 Script executed:

#!/bin/bash
set -euo pipefochip

# Verify the exact cancellation flow in cancel_chat
sed -n '195,210p' src/openhuman/channels/providers/web.rs

Repository: tinyhumansai/openhuman

Length of output: 121


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Verify the exact cancellation flow in cancel_chat
sed -n '195,210p' src/openhuman/channels/providers/web.rs

Repository: tinyhumansai/openhuman

Length of output: 574


Prevent stale progress events after request cancellation.

spawn_progress_bridge() spawns a detached task that drains progress events and publishes them with the captured request_id. When cancel_chat() aborts the main inference task, the bridge task continues running independently. If buffered progress events remain in the channel, they will be drained and published with the cancelled request_id after the cancellation, breaking request isolation.

Either store and abort the bridge JoinHandle alongside the main task, or guard event publishing against the current in-flight request before emitting.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/openhuman/channels/providers/web.rs` around lines 264 - 274, The progress
bridge currently drains buffered progress_rx after the inference task is
cancelled, causing stale events to be published for a cancelled request; modify
the code that creates and runs the bridge so it can be canceled or validated:
have spawn_progress_bridge return a JoinHandle and store it alongside the main
inference task (or return a cancellable future) so cancel_chat can abort the
bridge handle, or add a request validation check inside the bridge’s publishing
loop that compares the captured request_id against the current in-flight request
before emitting; update agent.set_on_progress usage to ensure progress_tx is
closed/removed when the bridge is aborted to avoid draining leftover events.

The e2e test expected `chat_done` as the first SSE event, but now
real-time progress events (inference_start, iteration_start) are
emitted before it. Use `read_sse_event_by_type` to skip progress
events and wait for the terminal `chat_done` event.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
tests/json_rpc_e2e.rs (1)

444-486: Prefer removing unused SSE helper instead of suppressing dead code.

read_first_sse_event is now unused (Line 444). Consider deleting it, or extracting shared parsing into one helper used by both paths.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/json_rpc_e2e.rs` around lines 444 - 486, The helper
read_first_sse_event is unused; either delete the entire function (and its
#[allow(dead_code)] annotation) to remove dead code, or refactor the parsing
logic into a shared helper (e.g., parse_sse_payload or
parse_sse_events_from_stream) that accepts a bytes stream or buffered text and
returns serde_json::Value, then replace duplicate parsing in other test paths
with calls to that new helper and remove read_first_sse_event. Ensure references
are updated for the function name read_first_sse_event and that
serde_json::from_str error handling is preserved in the consolidated helper.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tests/json_rpc_e2e.rs`:
- Around line 491-534: The SSE parsing currently decodes each incoming chunk
with from_utf8 which can drop bytes when a multibyte UTF-8 sequence is split
across chunk boundaries and only looks for "\n\n" frames; change
read_sse_event_by_type (and read_first_sse_event) to buffer at the byte level
(Vec<u8>), append each chunk as bytes, search the buffer for either b"\r\n\r\n"
or b"\n\n" to extract complete frame bytes, then decode the extracted frame once
with std::str::from_utf8 (handling the Result rather than silently using ""),
parse lines for "data:" prefixes, and only then json-decode the joined payload;
this preserves UTF-8 integrity across chunk boundaries and accepts both CRLF and
LF SSE frame terminators.

---

Nitpick comments:
In `@tests/json_rpc_e2e.rs`:
- Around line 444-486: The helper read_first_sse_event is unused; either delete
the entire function (and its #[allow(dead_code)] annotation) to remove dead
code, or refactor the parsing logic into a shared helper (e.g.,
parse_sse_payload or parse_sse_events_from_stream) that accepts a bytes stream
or buffered text and returns serde_json::Value, then replace duplicate parsing
in other test paths with calls to that new helper and remove
read_first_sse_event. Ensure references are updated for the function name
read_first_sse_event and that serde_json::from_str error handling is preserved
in the consolidated helper.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: a3d8d80c-9aaf-4e9e-88dd-77f13a0b345f

📥 Commits

Reviewing files that changed from the base of the PR and between 82dcc16 and 241f070.

📒 Files selected for processing (1)
  • tests/json_rpc_e2e.rs

Comment on lines +491 to +534
async fn read_sse_event_by_type(events_url: &str, target_event: &str) -> Value {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(120))
.build()
.expect("client");
let resp = client
.get(events_url)
.send()
.await
.unwrap_or_else(|e| panic!("GET {events_url}: {e}"));
assert!(
resp.status().is_success(),
"SSE HTTP error {} for {}",
resp.status(),
events_url
);

let mut stream = resp.bytes_stream();
let mut buffer = String::new();
while let Some(item) = stream.next().await {
let chunk = item.unwrap_or_else(|e| panic!("sse stream read failed: {e}"));
let text = std::str::from_utf8(&chunk).unwrap_or("");
buffer.push_str(text);
while let Some(idx) = buffer.find("\n\n") {
let block = buffer[..idx].to_string();
buffer = buffer[idx + 2..].to_string();
let mut data_lines = Vec::new();
for line in block.lines() {
if let Some(data) = line.strip_prefix("data:") {
data_lines.push(data.trim_start());
}
}
if !data_lines.is_empty() {
let payload = data_lines.join("\n");
let value: Value = serde_json::from_str(&payload)
.unwrap_or_else(|e| panic!("invalid sse data json: {e}"));
if value.get("event").and_then(Value::as_str) == Some(target_event) {
return value;
}
}
}
}
panic!("SSE stream ended before receiving '{target_event}' event");
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verification: ensure fragile chunk decoding/framing patterns are removed.
rg -n 'from_utf8\(&chunk\)\.unwrap_or\(""\)' tests/json_rpc_e2e.rs
rg -n 'find\("\\n\\n"\)' tests/json_rpc_e2e.rs

Repository: tinyhumansai/openhuman

Length of output: 309


Harden SSE parsing against chunked UTF-8 fragmentation and frame delimiters.

Lines 512 and 514 contain a critical issue: decoding each chunk separately with from_utf8(&chunk).unwrap_or("") silently drops bytes when a multibyte UTF-8 character is split across chunk boundaries, causing sporadic JSON parse panics. Additionally, framing only handles \n\n, not the also-valid \r\n\r\n separator per SSE spec. The same vulnerability exists in read_first_sse_event at lines 466 and 468.

Buffer at the byte level, decode once after collecting complete frames:

Proposed fix
 async fn read_sse_event_by_type(events_url: &str, target_event: &str) -> Value {
@@
-    let mut stream = resp.bytes_stream();
-    let mut buffer = String::new();
+    let mut stream = resp.bytes_stream();
+    let mut buffer: Vec<u8> = Vec::new();
     while let Some(item) = stream.next().await {
         let chunk = item.unwrap_or_else(|e| panic!("sse stream read failed: {e}"));
-        let text = std::str::from_utf8(&chunk).unwrap_or("");
-        buffer.push_str(text);
-        while let Some(idx) = buffer.find("\n\n") {
-            let block = buffer[..idx].to_string();
-            buffer = buffer[idx + 2..].to_string();
+        buffer.extend_from_slice(&chunk);
+        while let Some((idx, sep_len)) = find_sse_separator(&buffer) {
+            let block = buffer[..idx].to_vec();
+            buffer.drain(..idx + sep_len);
+            let block = String::from_utf8_lossy(&block);
             let mut data_lines = Vec::new();
             for line in block.lines() {
                 if let Some(data) = line.strip_prefix("data:") {
                     data_lines.push(data.trim_start());
                 }
@@
     }
     panic!("SSE stream ended before receiving '{target_event}' event");
 }
+
+fn find_sse_separator(buf: &[u8]) -> Option<(usize, usize)> {
+    if let Some(i) = buf.windows(2).position(|w| w == b"\n\n") {
+        return Some((i, 2));
+    }
+    buf.windows(4).position(|w| w == b"\r\n\r\n").map(|i| (i, 4))
+}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/json_rpc_e2e.rs` around lines 491 - 534, The SSE parsing currently
decodes each incoming chunk with from_utf8 which can drop bytes when a multibyte
UTF-8 sequence is split across chunk boundaries and only looks for "\n\n"
frames; change read_sse_event_by_type (and read_first_sse_event) to buffer at
the byte level (Vec<u8>), append each chunk as bytes, search the buffer for
either b"\r\n\r\n" or b"\n\n" to extract complete frame bytes, then decode the
extracted frame once with std::str::from_utf8 (handling the Result rather than
silently using ""), parse lines for "data:" prefixes, and only then json-decode
the joined payload; this preserves UTF-8 integrity across chunk boundaries and
accepts both CRLF and LF SSE frame terminators.

@senamakel senamakel merged commit 8635ac1 into tinyhumansai:main Apr 12, 2026
8 of 9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant